/*
 * Unidata Platform
 * Copyright (c) 2013-2020, UNIDATA LLC, All rights reserved.
 *
 * Commercial License
 * This version of Unidata Platform is licensed commercially and is the appropriate option for the vast majority of use cases.
 *
 * Please see the Unidata Licensing page at: https://unidata-platform.com/license/
 * For clarification or additional options, please contact: info@unidata-platform.com
 * -------
 * Disclaimer:
 * -------
 * THIS SOFTWARE IS DISTRIBUTED "AS-IS" WITHOUT ANY WARRANTIES, CONDITIONS AND
 * REPRESENTATIONS WHETHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE
 * IMPLIED WARRANTIES AND CONDITIONS OF MERCHANTABILITY, MERCHANTABLE QUALITY,
 * FITNESS FOR A PARTICULAR PURPOSE, DURABILITY, NON-INFRINGEMENT, PERFORMANCE AND
 * THOSE ARISING BY STATUTE OR FROM CUSTOM OR USAGE OF TRADE OR COURSE OF DEALING.
 */
package org.unidata.mdm.dq.core.service.impl.function.system.misc;

import static org.unidata.mdm.dq.core.type.constant.CleanseConstants.INPUT_PORT_1;
import static org.unidata.mdm.dq.core.type.constant.CleanseConstants.OUTPUT_PORT_1;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.unidata.mdm.core.type.data.ArrayAttribute;
import org.unidata.mdm.core.type.data.ArrayAttribute.ArrayDataType;
import org.unidata.mdm.core.type.data.Attribute;
import org.unidata.mdm.core.type.data.Attribute.AttributeType;
import org.unidata.mdm.core.type.data.CodeAttribute;
import org.unidata.mdm.core.type.data.CodeAttribute.CodeDataType;
import org.unidata.mdm.core.type.data.ComplexAttribute;
import org.unidata.mdm.core.type.data.SimpleAttribute;
import org.unidata.mdm.core.type.data.SimpleAttribute.SimpleDataType;
import org.unidata.mdm.core.type.data.SingleValueAttribute;
import org.unidata.mdm.dq.core.context.CleanseFunctionContext;
import org.unidata.mdm.dq.core.dto.CleanseFunctionResult;
import org.unidata.mdm.dq.core.service.impl.function.system.AbstractSystemCleanseFunction;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionConfiguration;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionExecutionScope;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionInputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionOutputParam;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortFilteringMode;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortInputType;
import org.unidata.mdm.dq.core.type.cleanse.CleanseFunctionPortValueType;
import org.unidata.mdm.dq.core.type.constant.CleanseConstants;
import org.unidata.mdm.system.util.TextUtils;

/**
 * Check that all selected values exist.
 * @author ilya.bykov
 */
public class CheckExists extends AbstractSystemCleanseFunction {
    /**
     * Display name code.
     */
    private static final String FUNCTION_DISPLAY_NAME = "app.dq.functions.system.misc.exists.display.name";
    /**
     * Description code.
     */
    private static final String FUNCTION_DESCRIPTION = "app.dq.functions.system.misc.exists.decsription";
    /**
     * IP1 name code.
     */
    private static final String INPUT_PORT_1_NAME = "app.dq.functions.system.misc.exists.input.port1.name";
    /**
     * IP1 description code.
     */
    private static final String INPUT_PORT_1_DESCRIPTION = "app.dq.functions.system.misc.exists.input.port1.decsription";
    /**
     * OP1 name code.
     */
    private static final String OUTPUT_PORT_1_NAME = "app.dq.functions.system.misc.exists.output.port1.name";
    /**
     * OP1 description code.
     */
    private static final String OUTPUT_PORT_1_DESCRIPTION = "app.dq.functions.system.misc.exists.output.port1.decsription";
    /**
     * This function configuration.
     */
    private static final CleanseFunctionConfiguration CONFIGURATION
        = CleanseFunctionConfiguration.configuration()
            .supports(CleanseFunctionExecutionScope.LOCAL, CleanseFunctionExecutionScope.GLOBAL)
            .input(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.INPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(INPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(INPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL_WITH_INCOMPLETE)
                    .inputTypes(
                            CleanseFunctionPortInputType.SIMPLE,
                            CleanseFunctionPortInputType.ARRAY,
                            CleanseFunctionPortInputType.CODE,
                            CleanseFunctionPortInputType.COMPLEX)
                    .valueTypes(CleanseFunctionPortValueType.ANY)
                    .required(false)
                    .build())
            .output(CleanseFunctionConfiguration.port()
                    .name(CleanseConstants.OUTPUT_PORT_1)
                    .displayName(() -> TextUtils.getText(OUTPUT_PORT_1_NAME))
                    .description(() -> TextUtils.getText(OUTPUT_PORT_1_DESCRIPTION))
                    .filteringMode(CleanseFunctionPortFilteringMode.MODE_ALL)
                    .inputTypes(CleanseFunctionPortInputType.SIMPLE)
                    .valueTypes(CleanseFunctionPortValueType.BOOLEAN)
                    .required(true)
                    .build())
            .build();
    /**
     * Instantiates a new CF is exists.
     */
    public CheckExists() {
        super("CheckExists", () -> TextUtils.getText(FUNCTION_DISPLAY_NAME), () -> TextUtils.getText(FUNCTION_DESCRIPTION));
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionConfiguration configure() {
        return CONFIGURATION;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public CleanseFunctionResult execute(CleanseFunctionContext ctx) {

        CleanseFunctionResult output = new CleanseFunctionResult();
        CleanseFunctionInputParam input = ctx.getInputParam(INPUT_PORT_1);

        boolean holdsNoEmptyValues = true;
        boolean hasInput = Objects.nonNull(input);
        if (hasInput) {

            List<Pair<String, Attribute>> failedPaths = checkEmptyValues(input);
            if (CollectionUtils.isNotEmpty(failedPaths)) {

                holdsNoEmptyValues = false;
                failedPaths.forEach(pair -> output.addSpot(pair.getKey(), pair.getValue()));
            }
        }

        if (hasInput && CollectionUtils.isNotEmpty(input.getIncomplete())) {

            holdsNoEmptyValues = false;
            input.getIncomplete().forEach(element -> output.addSpot(element.toLocalPath(), null));
        }

        output.putOutputParam(CleanseFunctionOutputParam.of(OUTPUT_PORT_1, hasInput && holdsNoEmptyValues && !input.isEmpty()));
        return output;
    }
    /**
     * Tells whether input consists entirely of empty strings.
     * @param input the input to check
     * @return true for empty strings, false otherwise
     */
    @SuppressWarnings("unchecked")
    private List<Pair<String, Attribute>> checkEmptyValues(CleanseFunctionInputParam input) {

        List<Pair<String, Attribute>> failed = new ArrayList<>();
        for (int i = 0; i < input.getAttributes().size(); i++) {

            Attribute attribute = input.getAttributes().get(i);
            if (attribute.isEmpty()) {
                failed.add(ImmutablePair.of(attribute.toLocalPath(), attribute));
                continue;
            }

            if (attribute.getAttributeType() == AttributeType.ARRAY) {

                boolean hasEmpty = ((ArrayAttribute<String>) attribute).getValues().stream()
                        .anyMatch(obj -> ((ArrayAttribute<?>) attribute).getDataType() == ArrayDataType.STRING
                            ? StringUtils.isBlank(obj.toString())
                            : Objects.isNull(obj));

                if (hasEmpty) {
                    failed.add(ImmutablePair.of(attribute.toLocalPath(), attribute));
                }
            } else if (attribute.getAttributeType() == AttributeType.COMPLEX) {

                boolean noEmptyRecords = ((ComplexAttribute) attribute).stream().noneMatch(record -> record.getSize() == 0);
                if (!noEmptyRecords) {
                    failed.add(ImmutablePair.of(attribute.toLocalPath(), attribute));
                }
            } else {

                SingleValueAttribute<?> sa = attribute.narrow();

                if (sa.getAttributeType() == AttributeType.CODE
                && ((CodeAttribute<?>) sa).getDataType() == CodeDataType.STRING
                && StringUtils.isBlank(((CodeAttribute<String>) sa).getValue())) {
                    failed.add(ImmutablePair.of(attribute.toLocalPath(), attribute));
                }

                if (sa.getAttributeType() == AttributeType.SIMPLE
                && ((SimpleAttribute<?>) sa).getDataType() == SimpleDataType.STRING
                && StringUtils.isBlank(((SimpleAttribute<String>) sa).getValue())) {
                    failed.add(ImmutablePair.of(attribute.toLocalPath(), attribute));
                }
            }
        }

        return failed;
    }
}
